package com.flink.java.demo.transform;

import com.flink.java.demo.FlinkConstant;
import com.flink.java.demo.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 富有函数
 *
 * “富函数”是DataStream API提供的一一个函数类的接口，所有Flink函数类都有其Rich版本。它与常规函数的不同在于，可以获取运行环境的上下文，并拥有一些生命周期方法，所以可以实现更复杂的功能。
 *
 */
public class TransformTest5_RichFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 从文件读取数据
        DataStream<String> inputStream = env.readTextFile(FlinkConstant.sensorPath);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 转换成Tuple2类型
        DataStream<Tuple2<String, Integer>> resultStream = dataStream.map( new MyMapper() );

        resultStream.print();

        env.execute();
    }

    public static class MyMapper0 implements MapFunction<SensorReading, Tuple2<String, Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            return new Tuple2<>(value.getId(), value.getId().length());
        }
    }

    /**
     *  RichXXXFunction: 富函数
     * 1、多了生命周期管理方法：
     *    open(): 每个子任务，在启动时，调用一次
     *    close():每个子任务，在结束时，调用一次
     *      => 如果是flink程序异常挂掉，不会调用close
     *      => 如果是正常调用 cancel命令，可以close
     * 2、多了一个 运行时上下文
     *    可以获取一些运行时的环境信息，比如 子任务编号、名称、其他的.....
     */
    public static class MyMapper extends RichMapFunction<SensorReading, Tuple2<String, Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
//            getRuntimeContext().getState();
            return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
        }

        // 初始化工作，一般是定义状态，或者建立数据库连接
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open");
        }

        // 一般是关闭连接和清空状态的收尾操作
        @Override
        public void close() throws Exception {
            System.out.println("close");
        }
    }
}
